1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final Dispatcher dispatcher; final @Nullable Proxy proxy; final List<Protocol> protocols;final List<ConnectionSpec> connectionSpecs;final List<Interceptor> interceptors;final List<Interceptor> networkInterceptors;final EventListener.Factory eventListenerFactory;final ProxySelector proxySelector;final CookieJar cookieJar;final @Nullable Cache cache;final @Nullable InternalCache internalCache;final SocketFactory socketFactory;final SSLSocketFactory sslSocketFactory;final CertificateChainCleaner certificateChainCleaner;final HostnameVerifier hostnameVerifier;final CertificatePinner certificatePinner;final Authenticator proxyAuthenticator;final Authenticator authenticator;final ConnectionPool connectionPool;final Dns dns;final boolean followSslRedirects;final boolean followRedirects;final boolean retryOnConnectionFailure;final int callTimeout;final int connectTimeout;final int readTimeout;final int writeTimeout;final int pingInterval;
websocket wikipedia :
利用tcp提供全双工通信 WebSocket is a computer communications protocol, providing full-duplex communication channels over a single TCP connection.
运行在80/443端口上 WebSocket is designed to work over HTTP ports 443 and 80 as well as to support HTTP proxies and intermediaries
Dispatcher - 线程控制 使用Deque控制任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private int maxRequests = 64 ; private int maxRequestsPerHost = 5 ; private @Nullable Runnable idleCallback;private @Nullable ExecutorService executorService;private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque <>();private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque <>();private final Deque<RealCall> runningSyncCalls = new ArrayDeque <>();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void enqueue (AsyncCall call) { synchronized (this ) { readyAsyncCalls.add(call); if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null ) call.reuseCallsPerHostFrom(existingCall); } } promoteAndExecute(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private boolean promoteAndExecute () { assert (!Thread.holdsLock(this )); List<AsyncCall> executableCalls = new ArrayList <>(); boolean isRunning; synchronized (this ) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); if (runningAsyncCalls.size() >= maxRequests) break ; if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue ; i.remove(); asyncCall.callsPerHost().incrementAndGet(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0 ; } for (int i = 0 , size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; }
1 2 3 4 5 6 7 public synchronized ExecutorService executorService () { if (executorService == null ) { executorService = new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60 , TimeUnit.SECONDS, new SynchronousQueue <>(), Util.threadFactory("OkHttp Dispatcher" , false )); } return executorService; }
AsyncCall - 异步请求 1 2 3 4 final class AsyncCall extends NamedRunnable { private final Callback responseCallback; private volatile AtomicInteger callsPerHost = new AtomicInteger (0 ); }
承接上文的executeOn,使用线程池执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void executeOn (ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false ; try { executorService.execute(this ); success = true ; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException ("executor rejected" ); ioException.initCause(e); transmitter.noMoreExchanges(ioException); responseCallback.onFailure(RealCall.this , ioException); } finally { if (!success) { client.dispatcher().finished(this ); } } }
线程池拿到runnable,调用run,NamedRunnable调用execute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override protected void execute () { boolean signalledCallback = false ; transmitter.timeoutEnter(); try { Response response = getResponseWithInterceptorChain(); signalledCallback = true ; responseCallback.onResponse(RealCall.this , response); } catch (IOException e) { if (signalledCallback) { Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this , e); } } catch (Throwable t) { cancel(); if (!signalledCallback) { IOException canceledException = new IOException ("canceled due to " + t); canceledException.addSuppressed(t); responseCallback.onFailure(RealCall.this , canceledException); } throw t; } finally { client.dispatcher().finished(this ); } }
创建拦截器,并开始一层层执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 Response getResponseWithInterceptorChain () throws IOException { List<Interceptor> interceptors = new ArrayList <>(); interceptors.addAll(client.interceptors()); interceptors.add(new RetryAndFollowUpInterceptor (client)); interceptors.add(new BridgeInterceptor (client.cookieJar())); interceptors.add(new CacheInterceptor (client.internalCache())); interceptors.add(new ConnectInterceptor (client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor (forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain (interceptors, transmitter, null , 0 , originalRequest, this , client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); boolean calledNoMoreExchanges = false ; try { Response response = chain.proceed(originalRequest); if (transmitter.isCanceled()) { closeQuietly(response); throw new IOException ("Canceled" ); } return response; } catch (IOException e) { calledNoMoreExchanges = true ; throw transmitter.noMoreExchanges(e); } finally { if (!calledNoMoreExchanges) { transmitter.noMoreExchanges(null ); } } }
interceptor - 拦截器 RealInterceptorChain 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public Response proceed (Request request) throws IOException { return proceed(request, transmitter, exchange); } public Response proceed (Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { RealInterceptorChain next = new RealInterceptorChain (interceptors, transmitter, exchange, index + 1 , request, call, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); return response; }
XXXInterceptor - 对于一般的Interceptor实现 1 2 3 4 5 6 7 8 @Override public Response intercept (Chain chain) throws IOException { Request request = chain.request(); Response response = chain.proceed(request); return response; }
interceptor获得chain,就是整个interceptor的链条,chain的request获取请求,对请求处理后,调用proceed,将请求处理给下一个interceptor,并返回response,在对响应处理后,将response返回。每个interceptor对象配合一个RealInterceptorChain工作。 RealInterceptorChain是一个chain,(也就是interceptor的参数),RealInterceptorChain保存上一级的request
interceptor通过调用RealInterceptorChain的proceed函数传递自己处理的request,proceed函数创建下一个interceptor的RealInterceptorChain,并调用interceptor的intercept,这样下一个intercept又会调用request获取request,然后调用proceed传递处理后的请求,得到response。 每个interceptor调用proceed获得响应并处理后,将自己处理后的请求返回给上一级的RealInterceptorChain.proceed,上一级的RealInterceptorChain.proceed又将其返回给上一级的intercept函数
ConnectionSpec - 连接配置 CertificatePinner - 自签名验证 Authenticator - 登录 connectionPool DNS followxxxredirect pingInterval - websocket的心跳间隔